-
Notifications
You must be signed in to change notification settings - Fork 23
perf: allow pubsub actor messages to be handled in parallel #659
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe Changes
Sequence DiagramsequenceDiagram
participant Main Loop
participant Pending Messages
participant Handler
Note over Main Loop: Before: Sequential processing
Main Loop->>Handler: Await handle_msg(msg1)
Handler-->>Main Loop: Complete
Main Loop->>Handler: Await handle_msg(msg2)
Handler-->>Main Loop: Complete
Note over Main Loop: After: Concurrent processing
Main Loop->>Pending Messages: Push handle_msg(msg1)
Main Loop->>Pending Messages: Push handle_msg(msg2)
Main Loop->>Pending Messages: Await next() completion
par Concurrent
Pending Messages->>Handler: Process msg1
Pending Messages->>Handler: Process msg2
and
Handler-->>Pending Messages: Complete msg1
Handler-->>Pending Messages: Complete msg2
end
Pending Messages-->>Main Loop: Return completed future
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Possibly related PRs
Suggested reviewers
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Manual Deploy AvailableYou can trigger a manual deploy of this PR branch to testnet: Alternative: Comment
Comment updated automatically when the PR is synchronized. |
bmuddha
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
9004162 to
e9c3a8f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs(2 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-11-07T14:20:31.457Z
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 621
File: magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs:457-495
Timestamp: 2025-11-07T14:20:31.457Z
Learning: In magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs, the unsubscribe closure returned by PubSubConnection::account_subscribe(...) resolves to () (unit), not a Result. Downstream code should not attempt to inspect an unsubscribe result and can optionally wrap it in a timeout to guard against hangs.
Applied to files:
magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs
📚 Learning: 2025-11-19T09:34:37.917Z
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 621
File: test-integration/test-chainlink/tests/ix_remote_account_provider.rs:62-63
Timestamp: 2025-11-19T09:34:37.917Z
Learning: In test-integration/test-chainlink/tests/ix_remote_account_provider.rs and similar test files, the `_fwd_rx` receiver returned by `init_remote_account_provider()` is intentionally kept alive (but unused) to prevent "receiver dropped" errors on the sender side. The pattern `let (remote_account_provider, _fwd_rx) = init_remote_account_provider().await;` should NOT be changed to `let (remote_account_provider, _) = ...` because dropping the receiver would cause send() operations to fail.
Applied to files:
magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Build Project
- GitHub Check: run_make_ci_test
- GitHub Check: run_make_ci_lint
| }, | ||
| }; | ||
|
|
||
| use futures_util::stream::FuturesUnordered; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Concurrent handling via FuturesUnordered breaks subscribe→unsubscribe ordering assumptions and can leak subscriptions
The new FuturesUnordered-based loop makes handle_msg futures for all message types run concurrently. Because of how handle_msg is structured:
-
AccountSubscribe:- Calls
Self::add_sub(...).await. add_subinserts theAccountSubscriptionintosubsbefore its first.await(account_subscribe().await).
- Calls
-
AccountUnsubscribe:- Has no
.await; its entire branch is synchronous and completes on the first poll of its future.
- Has no
With FuturesUnordered (and without any explicit ordering guarantees), there is now a valid interleaving where:
- A subscribe message for
pubkeyis received first and itshandle_msgfuture is pushed intopending_messages. - A subsequent unsubscribe message for the same
pubkeyis received and itshandle_msgfuture is also pushed. - When
pending_messages.next()runs, it may poll the unsubscribe future before the subscribe future’s first poll. - In that case:
- The unsubscribe branch checks
subs.lock().get(&pubkey)and finds no entry yet, so it returnsAccountSubscriptionDoesNotExist. - Afterwards, the subscribe future is polled for the first time, and
add_subinserts theAccountSubscriptioninto the map and sets up the real subscription.
- The unsubscribe branch checks
This results in:
- The caller logically issuing subscribe then unsubscribe, but:
- Seeing an unsubscribe error (
AccountSubscriptionDoesNotExist). - Still ending up with a live subscription (because
add_subcompletes later and spawns the update task).
- Seeing an unsubscribe error (
- A leaked subscription that the caller believes was removed.
- The comment in
add_subabout “messages to this actor are processed in the order they are sent” no longer being true under the concurrentFuturesUnorderedprocessing.
This is a semantic correctness regression introduced by the concurrency change, not just a theoretical concern.
Consider one of the following fixes:
-
Restrict concurrency to subscription establishment only:
- Keep the actor loop sequential for control messages (
AccountUnsubscribe,Reconnect). - For
AccountSubscribe, enqueue/track only the long‑running “establish subscription” work inFuturesUnordered(e.g., a dedicated async function that performsaccount_subscribe+ spawn of the update loop), while the mainhandle_msgpath that updatessubsand returns the oneshot response preserves message ordering.
- Keep the actor loop sequential for control messages (
-
Explicitly track in‑flight subscriptions:
- Maintain a per‑
Pubkeystate for “subscription being created”. - Treat an unsubscribe for an in‑flight subscription as success and arrange for the creation path to observe and honor that cancellation (e.g., cancel before or immediately after adding to
subs).
- Maintain a per‑
Additionally, on shutdown or when messages_receiver is closed, the loop breaks and drops pending_messages, abruptly canceling in‑flight handle_msg futures. This is a behavior change from the previous sequential implementation, where the currently executing handle_msg would finish before exit. Please double‑check that it’s acceptable for callers to sometimes see pending subscribe/reconnect requests fail with dropped oneshots during shutdown.
Also applies to: 232-252, 257-257, 362-370, 376-381
🌐 Web query:
Does `futures_util::stream::FuturesUnordered` provide any guarantee about the order in which newly pushed futures are first polled, relative to each other? Specifically, can a future that was pushed later be polled and complete before a previously pushed future’s first poll?
💡 Result:
Short answer: No — there is no guarantee. FuturesUnordered is unordered: pushing a future does not poll it, and futures are driven only when they generate wake notifications, so a future pushed later can be polled (and even complete) before an earlier-pushed future’s first poll. Use FuturesOrdered if you need FIFO semantics. [1][2]
Sources:
- futures-util/FuturesUnordered docs (behaviour and note that it “may complete in any order”). [1]
- futures::stream::FuturesOrdered docs (contrast: preserves FIFO order). [2]
The web search confirms the review comment's analysis is correct. FuturesUnordered provides no guarantees about polling order—a future pushed later can be polled and complete before an earlier-pushed future's first poll; FuturesOrdered is the correct choice for FIFO semantics. This directly validates the identified race condition where an unsubscribe could complete before a preceding subscribe's critical section (insertion into subs) executes.
The review comment is accurate and comprehensive. The semantic correctness regression is real and actionable.
Concurrent handling via FuturesUnordered breaks subscribe→unsubscribe ordering assumptions and can leak subscriptions
The new FuturesUnordered-based loop makes handle_msg futures for all message types run concurrently. Because of how handle_msg is structured:
-
AccountSubscribe:- Calls
Self::add_sub(...).await. add_subinserts theAccountSubscriptionintosubsbefore its first.await(account_subscribe().await).
- Calls
-
AccountUnsubscribe:- Has no
.await; its entire branch is synchronous and completes on the first poll of its future.
- Has no
With FuturesUnordered (which provides no ordering guarantees), there is now a valid interleaving where:
- A subscribe message for
pubkeyis received first and itshandle_msgfuture is pushed intopending_messages. - A subsequent unsubscribe message for the same
pubkeyis received and itshandle_msgfuture is also pushed. - When
pending_messages.next()runs, it may poll the unsubscribe future before the subscribe future's first poll. - In that case:
- The unsubscribe branch checks
subs.lock().get(&pubkey)and finds no entry yet, so it returnsAccountSubscriptionDoesNotExist. - Afterwards, the subscribe future is polled for the first time, and
add_subinserts theAccountSubscriptioninto the map and sets up the real subscription.
- The unsubscribe branch checks
This results in:
- The caller logically issuing subscribe then unsubscribe, but:
- Seeing an unsubscribe error (
AccountSubscriptionDoesNotExist). - Still ending up with a live subscription (because
add_subcompletes later and spawns the update task).
- Seeing an unsubscribe error (
- A leaked subscription that the caller believes was removed.
- The comment in
add_subabout "messages to this actor are processed in the order they are sent" no longer being true under the concurrentFuturesUnorderedprocessing.
Consider one of the following fixes:
-
Restrict concurrency to subscription establishment only:
- Keep the actor loop sequential for control messages (
AccountUnsubscribe,Reconnect). - For
AccountSubscribe, enqueue/track only the long‑running "establish subscription" work inFuturesUnordered(e.g., a dedicated async function that performsaccount_subscribe+ spawn of the update loop), while the mainhandle_msgpath that updatessubsand returns the oneshot response preserves message ordering.
- Keep the actor loop sequential for control messages (
-
Explicitly track in‑flight subscriptions:
- Maintain a per‑
Pubkeystate for "subscription being created". - Treat an unsubscribe for an in‑flight subscription as success and arrange for the creation path to observe and honor that cancellation (e.g., cancel before or immediately after adding to
subs).
- Maintain a per‑
Additionally, on shutdown or when messages_receiver is closed, the loop breaks and drops pending_messages, abruptly canceling in‑flight handle_msg futures. This is a behavior change from the previous sequential implementation, where the currently executing handle_msg would finish before exit. Please double‑check that it's acceptable for callers to sometimes see pending subscribe/reconnect requests fail with dropped oneshots during shutdown.
Also applies to: lines 232-252, 257, 362-370, 376-381
0fa1d7b to
950590b
Compare
* master: perf: allow pubsub actor messages to be handled in parallel (#659)
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.
Allow pubsub actor messages to be handled in parallel instead of sequentially. This improves
throughput by processing multiple messages concurrently rather than waiting for each message to
complete before processing the next one.
Closes: #652
Details
magicblock-chainlink
Modified the message handling loop in
ChainPubsubActorto useFuturesUnorderedforconcurrent message processing:
and added to a collection
be processed concurrently
improving overall throughput and responsiveness of the pubsub actor